AIP-96 (draft): add CHECKPOINTED state and AirflowTaskCheckpointed exception#66402
AIP-96 (draft): add CHECKPOINTED state and AirflowTaskCheckpointed exception#664021fanwang wants to merge 8 commits into
Conversation
…ception
Foundation for AIP-96 (Resumable Operators) — adds vocabulary so the
operator-author API surface can be discussed against running code.
- CHECKPOINTED added to TaskInstanceState (intermediate state, in
State.unfinished, lightyellow in the UI)
- AirflowTaskCheckpointed exception with optional checkpoint_data
payload, mirroring the AirflowRescheduleException pattern
(serializable, single domain payload)
- run() catches the exception and reports CHECKPOINTED state to the
supervisor
Persistence of checkpoint_data, scheduler auto-resume semantics, the
listener hook (apache#66410), and downstream trigger-rule integration are
intentionally out of scope here so each can be discussed separately.
882780f to
31e50c7
Compare
…file lives there)
…he#66410; their newsfragments live there)
mypy on task-sdk: TITargetStatePayload.state expects IntermediateTIState, not the broader TaskInstanceState. Both enums have CHECKPOINTED with the same string value 'checkpointed'. Also regenerated the openapi specs and airflowctl datamodel to include the CHECKPOINTED state from PR-E (apache#66402).
Discussion-only — companion to AIP-96 v2 (cwiki) and the AIP-96 PR set (apache#66402 foundation, apache#66410 listener hook, apache#66445 supervisor wiring). NOT FOR MERGE. Demonstrates the v1 integration pattern from AIP-96 v2: a subclass of DatabricksSubmitRunOperator that survives worker disruption by: 1. Persisting self.run_id to AIP-103 task_state after submit_run. 2. Reading prior run_id on next attempt and reconnecting (no resubmit). 3. Converting SIGTERM into AirflowTaskCheckpointed instead of letting on_kill cancel the Databricks run. Imports AirflowTaskCheckpointed from airflow.sdk.exceptions, which is introduced by apache#66402 and not yet on main; this branch is rebased onto main so the diff shows only the new file. The file is self-explanatory as a design illustration; CI on this branch will not pass alone.
Layer 1: test_resumable_databricks_demo.py (~145 lines)
- Mock-based pytest (no real Databricks workspace).
- Asserts the resume contract:
- First execute(): submit_run called, run_id persisted.
- Disruption during poll raises AirflowTaskCheckpointed; run_id persists.
- Second execute() (after CHECKPOINTED): submit_run NOT called; prior
run_id reused; task_state cleared on success.
- on_kill on the resumable variant does NOT cancel the Databricks run.
Layer 2: test_aip96_resumable_pattern.py (~155 lines)
- Provider-agnostic, in-process simulator.
- Demonstrates the AIP-96 + AIP-103 primitives compose correctly without
per-provider code: submit-once, persist-via-task_state, reconnect-on-resume,
repeat-disruption-cycles preserve external_id.
All 8 tests pass against the AIP-96 stack code (apache#66402, apache#66410, apache#66445).
On this rebased-onto-main demo branch the tests will fail to import
AirflowTaskCheckpointed; that's expected — the discussion artifact is
the design, the working tests are evidence.
|
Closing — AIP-103 (Task State Management, accepted Apr 2026) and #67118 (ResumableJobMixin + Spark, in flight) together demonstrate that the resume-on-retry pattern works end-to-end without a new task instance state or a new exception. The operator's no-op The original AIP-96 v1 framing (CHECKPOINTED + AirflowTaskCheckpointed) predated AIP-103. Now that AIP-103 ships AIP-96 will be revised to focus on the standardization layer on top of AIP-103 + #67118: canonical resumable-operator API in |
Filing this AIP draft from the LinkedIn DI side. We run long-lived Spark / Flink / batch jobs where a single TaskInstance can span hours or days; killing and restarting from scratch on retry or scheduler restart is too expensive at our scale. This AIP introduces a
CHECKPOINTEDstate and anAirflowTaskCheckpointedexception so operators can persist mid-task progress and resume.Real-world failure shape driving this
Every operator team at LinkedIn DI running long-lived tasks (Spark transforms, Flink jobs, multi-hour fine-tuning runs) has rolled its own resume-from-checkpoint pattern — HuggingFace's
resume_from_checkpoint, customCheckpointManageradapters against external blob stores, Flink savepoints, and so on. The persistence is solved per-operator. What's missing is a state Airflow understands.Concrete failure shape: a 6-hour Spark transform hits the scheduler-restart window at hour 5. The operator has already written a checkpoint to its external store and is ready to resume from that point. Today Airflow restarts the TaskInstance from scratch because there's no way for the operator to signal "I reached a stable point and want to pause here" — even though the resumption logic exists at the operator layer. The 5 hours of compute is wasted, downstream wait is doubled, and the on-call ergonomics are poor (the failure looks like a generic retry, not a deliberate pause).
The gap closes when Airflow has a state-machine vocabulary for "checkpointed". Listeners, trigger rules, and the UI can then treat the paused state as first-class instead of every operator team rebuilding the resume semantics around an opaque retry. The cost shape is straightforward:
compute-hours-wasted = retry-rate × restart-from-scratch-cost; on long-lived tasks restart-from-scratch dominates the failure-recovery budget. CHECKPOINTED turns those restarts into resumes.AIP-96 (Resumable Operators) is currently in DRAFT on the cwiki. Opening
this as the smallest concrete artifact so the API shape can be discussed
against real code rather than only on the wiki — marking the PR draft for
that reason.
The minimum surface: a
CHECKPOINTEDtask instance state (intermediate,included in
State.unfinishedso trigger rules treat it as not-yet-done)and an
AirflowTaskCheckpointedexception that operators raise to signal"I reached a stable point and want to pause":
The worker catches it in
run()and reports the state. Persistence ofcheckpoint_data, scheduler auto-resume semantics, listener notification,and trigger-rule integration for downstream tasks are intentionally deferred
to follow-ups so each can be discussed without committing to a single
resumption policy.
Scheduler hot-path impact
CHECKPOINTEDis added to the existingState.unfinishedfrozenset only — trigger rule evaluation continues to do O(1) membership checks. No new branching in the scheduler loop, no new SQL, no new iteration cost over task candidates.Scheduler treatment of
CHECKPOINTEDDownstream tasks of a
CHECKPOINTEDupstream wait per the existingState.unfinishedsemantics — same shape asDEFERREDorUP_FOR_RETRYupstream. The foundation deliberately does not introduce auto-resume yet; that decision (auto-resume after delay vs manual-resume-only via API) is the single biggest design knob the AIP discussion needs to settle, and is intentionally left open here.A few questions that probably matter more than the implementation details
above:
AirflowTaskCheckpointedbe aBaseException(likeAirflowTaskTerminated/AirflowTaskTimeout) so usertry/except Exceptionblocks don't accidentally swallow it? Inheritedfrom
AirflowExceptionhere for symmetry with the skip/rescheduleexceptions, but happy to flip.
lightyellow) is a placeholder.AirflowTaskCheckpointedmirrorsAirflowTaskTerminated;CHECKPOINTEDmirrorsDEFERRED. Open to a different name.Testing
task-sdk/tests/task_sdk/execution_time/test_task_runner.pyexercises
run()against threecheckpoint_datashapes(
None, dict, list). Each variant raisesAirflowTaskCheckpointedfrom the operator'sexecute, runsthrough
run(), and asserts the returned state isTaskInstanceState.CHECKPOINTED.AirflowSkipException/AirflowRescheduleExceptioncontinue to pass — the new branch isadditive and lives between the existing skip and reschedule
branches in
run()'s exception chain.^ Add meaningful description above
Read the Pull Request Guidelines for more information.
E2E validation
What this doesn't verify (intentionally — those are out of scope here):
checkpoint_data(a follow-up). Reviewers wanting to see the listener path can look at Add on_task_instance_checkpointed listener hook #66410 (stacked PR) which fireson_task_instance_checkpointed(checkpoint_data=...)fromfinalize().E2E validation
What this doesn't verify (intentionally — those are out of scope here):
checkpoint_data(a follow-up). Reviewers wanting to see the listener path can look at Add on_task_instance_checkpointed listener hook #66410 (stacked PR) which fireson_task_instance_checkpointed(checkpoint_data=...)fromfinalize().Integrated mega-branch validation (all 7 PRs composed)
This PR was independently validated, plus all seven PRs in this stack (#66394, #66395, #66397, #66399, #66402, #66405, #66410) were merged onto a single branch and exercised end-to-end through real services —
airflow standalonerunning scheduler + API server + LocalExecutor + Postgres-equivalent (sqlite for the test). A single listener plugin declaring every new hook and parameter was registered, then 5 DAGs covering every state-transition path were triggered + a manual-set-state PATCH via the public API was issued. The listener log is below — every annotation maps a line to the PR that introduced it:What this validates jointly:
msg=...started,success,failed,skipped,up_for_retry,manually_set_to_failed)error: BaseException | NoneRuntimeError(wasstron PR-A alone)AirflowTaskCheckpointedrunning → checkpointedtransition observed at the listener and at the supervisor message boundaryfailure_detailskwargfailure_details=Noneflowing through every failure (no executor populates yet)checkpointed task=checkpoint_task checkpoint_data={'step': 5, ...}Repro
Bugs surfaced and fixed during this validation
This step caught 6 bugs that the layer-2 unit-test pass missed — every fix is a separate commit on its respective PR's branch:
AirflowTaskCheckpointedimport inrun()(NameError)_generated.pyTaskInstanceStatemissing CHECKPOINTED (AttributeError)TaskStatesupervisor message Literal rejected CHECKPOINTED (PydanticValidationError)_generated.pyIntermediateTIStatemissing CHECKPOINTEDfailure_details=Nonedefault silently received Noneon_task_instance_failedcall site missingfailure_detailskwarg (HookCallError: hook call must provide argument)Last two would have broken every task failure on apache/airflow
mainif the foundation PRs landed without the call-site fixes. The standalone-against-editable-install harness is a fast catch for this class.Supervisor wiring follow-up — #66445
The original gap noted here (CHECKPOINTED not in
STATES_SENT_DIRECTLY,TITargetStatePayloadnot handled by the API server's state-update endpoint) is now addressed in stacked PR #66445. With #66445 applied, the DB row lands atstate=checkpointedend-to-end through realairflow standaloneservices.checkpoint_datapersistence and scheduler auto-resume semantics remain deliberately open for the AIP discussion.